package main

import (
	"context"
	"encoding/json"
	"github.com/segmentio/kafka-go"
	"log"
	"saasems/services"
)

func startKafkaConsumer() {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"localhost:9092"},
		Topic:    "mqtt-data",
		GroupID:  "data-processor",
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})

	for {
		msg, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Printf("Error reading from Kafka: %v", err)
			continue
		}

		var data map[string]interface{}
		if err := json.Unmarshal(msg.Value, &data); err != nil {
			log.Printf("Error parsing Kafka message: %v", err)
			continue
		}

		// 存储实时数据
		//db := database.GetDB()
		//// ... 数据库存储逻辑 ...
		//fmt.Println(db)
		// 进行业务数据分析
		services.AnalyzeBusinessData(data)
	}
}
